构建LLM Agent

Table of Contents

之前模仿aider的界面写了个LLM的TUI界面,但它本身是没有agent能力的。不过我在上面预留了输入输出的回调函数,所以只要在那些函数内写一些解析执行相关的代码就好了。下面就是我把它作为一个shell agent的简单例子。

提示词

先设计一种格式,要让执行的代码可以被agent识别到并解析,比如下面这样在代码段的上面写一行 @@@execute: [yes/no] 的执行标志,用于设置代码是否自动执行还是通过询问的方式决定是否执行。

@@@execute: [yes/no]
```
```

prompt写成下面这样

self.prompt_template = """
你现在是一个 Bash 代码生成器,你的任务是根据用户的需求生成 Bash 代码。请严格遵守以下格式规范:
1. 代码输出格式
 - 代码段必须包裹在 三重反引号(```) 内。
 - 代码段的上方必须包含 执行标识(@@@execute:),格式如下:
   @@@execute: [yes/no]
   ```
   ```
 - 代码段和执行标识之间不要有空行。
2. 执行标识规则:
 - 当用户表达 明确的执行意图 (如“执行”、“立马执行”、“不要再问我要不要执行”)时,@@@execute: 的值必须设为yes 。
 - 否则,默认设置@@@execute为no 。

3. 每次只能生成一段代码,不得同时生成多段代码。

解析代码

用正则解析出标有@@@execute:的代码段的内容,然后返回标志位和代码的内容。

def parse_execute_code_block(self, text):
    import re
    #pattern = r'@@@execute:\s*(yes|no)?\s*```(.*?)```'
    pattern = r'@@@execute:\s*(yes|no)?\s*```.*?\n(.*?)```'
    match = re.search(pattern, text, re.DOTALL)
    if match:
        execute_value = match.group(1).strip() if match.group(1) else "no"
        code_content = match.group(2).strip()
        return {
            "execute_tag_found": True,
            "execute_value": execute_value,
            "code_content": code_content
        }
    else:
        return {
            "execute_tag_found": False,
            "execute_value": "no",
            "code_content": None
        }

执行

shell的执行无非就是通过bash -c的方式执行这些解析出来的代码。

def execute_bash_code(self, bash_script):
    import subprocess

    result = subprocess.run(
        ["bash", "-c", bash_script],
        text=True,
        capture_output=True
    )

    return {
        "stdout": result.stdout,
        "stderr": result.stderr,
        "return_code": result.returncode
    }

整个agent解析执行的class

def ask_yes_no(question):
    while True:
        response = input(question + " (yes/no): ").strip().lower()
        if response in ['yes', 'y']:
            return True
        elif response in ['no', 'n']:
            return False
        else:
            print("Please enter 'yes' or 'no'.")

class BashPromptExecutor:
    def __init__(self):
        """Initialize with the prompt template"""
        self.prompt_template = """
你现在是一个 Bash 代码生成器,你的任务是根据用户的需求生成 Bash 代码。请严格遵守以下格式规范:
1. 代码输出格式
 - 代码段必须包裹在 三重反引号(```) 内。
 - 代码段的上方必须包含 执行标识(@@@execute:),格式如下:
   @@@execute: [yes/no]
   ```
   ```
 - 代码段和执行标识之间不要有空行。
2. 执行标识规则:
 - 当用户表达 明确的执行意图 (如“执行”、“立马执行”、“不要再问我要不要执行”)时,@@@execute: 的值必须设为yes 。
 - 否则,默认设置@@@execute为no 。

3. 每次只能生成一段代码,不得同时生成多段代码。
"""

    def get_prompt(self):
        return self.prompt_template

    def parse_execute_code_block(self, text):
        import re
        #pattern = r'@@@execute:\s*(yes|no)?\s*```(.*?)```'
        pattern = r'@@@execute:\s*(yes|no)?\s*```.*?\n(.*?)```'
        match = re.search(pattern, text, re.DOTALL)
        if match:
            execute_value = match.group(1).strip() if match.group(1) else "no"
            code_content = match.group(2).strip()
            return {
                "execute_tag_found": True,
                "execute_value": execute_value,
                "code_content": code_content
            }
        else:
            return {
                "execute_tag_found": False,
                "execute_value": "no",
                "code_content": None
            }

    def process_response(self, response_text):
        parse_result = self.parse_execute_code_block(response_text)

        if not parse_result["execute_tag_found"]:
            return {
                "status": "no_executable_code_found",
                "message": "No executable code block with @@@execute: tag was found in the response."
            }

        bash_script = parse_result["code_content"]

        # Check if execute_value is "yes" to determine whether to execute immediately
        if parse_result["execute_value"].lower() == "yes":
            execution_result = self.execute_bash_code(bash_script)
            return {
                "status": "execution_completed",
                "code_executed": bash_script,
                "execution_results": execution_result
            }
        else:
            print(bash_script)
            if ask_yes_no("Confirm execution of the bash script?"):
                execution_result = self.execute_bash_code(bash_script)
                return {
                    "status": "execution_completed",
                    "code_executed": bash_script,
                    "execution_results": execution_result
                }
            else:
                return {
                    "status": "execution_pending",
                    "code_to_execute": bash_script,
                    "message": "User confirmation required before execution"
                }



    def execute_bash_code(self, bash_script):
        import subprocess

        result = subprocess.run(
            ["bash", "-c", bash_script],
            text=True,
            capture_output=True
        )

        return {
            "stdout": result.stdout,
            "stderr": result.stderr,
            "return_code": result.returncode
        }

完整代码

整合到之前写的TUI LLM界面中,就是一个简单的shell agent,它不能执行一些复杂的交互式命令,不过后续你可以自己修改执行的部分,以更多种多样的方式运行。

import os

from colorama import init, Fore, Style
from prompt_toolkit import prompt
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.completion import Completer, Completion
from google import genai


class GeminiChat:
    def __init__(self, api_key, initial_message, model="gemini-2.0-flash"):
        self.client = genai.Client(api_key=api_key)
        self.chat = self.client.chats.create(model=model)
        self.history = []
        self._add_to_history("user", initial_message)
        response = self.chat.send_message(initial_message)
        self._add_to_history("assistant", response.text)

    def send(self, message):
        self._add_to_history("user", message)
        response = self.chat.send_message(message)
        self._add_to_history("assistant", response.text)
        return response.text

    def _add_to_history(self, role, text):
        self.history.append({"role": role, "text": text})

    def print_history(self):
        for message in self.history:
            print(f'Role: {message["role"]}')
            print(f'Message: {message["text"]}')
            print("-" * 50)


class TerminalChat:
    class CommandCompleter(Completer):
        def __init__(self, commands):
            self.commands = commands

        def get_completions(self, document, complete_event):
            text = document.text
            if text.startswith("/"):
                word = text[1:]
                for command in self.commands:
                    if command.startswith(word):
                        yield Completion(command, start_position=-len(word))

    def __init__(
        self,
        send_function=None,
        recv_function=None,
        banner_function=None,
        custom_commands=None,
    ):
        self.send_function = send_function or self._default_send_function
        self.banner_function = banner_function or self._default_banner_function
        self.recv_function = recv_function or self._default_recv_function

        self.commands = {
            "quit": {
                "function": self._quit_command,
                "description": "Exit the chat application",
            },
            "help": {
                "function": self._help_command,
                "description": "Display available commands and their descriptions",
            },
        }

        if custom_commands:
            self.commands.update(custom_commands)

        self.completer = self.CommandCompleter(self.commands.keys())

        self.kb = KeyBindings()

        @self.kb.add("c-j")
        def _(event):
            event.current_buffer.insert_text("\n")

    def _default_send_function(self, user_input):
        return f"Echo: {user_input}"

    def _default_banner_function(self):
        print(f"{Fore.GREEN}Welcome to Terminal Chat!{Style.RESET_ALL}")
        print(f"{Fore.GREEN}Type '/help' for available commands{Style.RESET_ALL}")

    def _default_recv_function(self, text):
        print(f"{Fore.GREEN}Assistant: {Style.RESET_ALL}{text}")

    def _quit_command(self, _):
        print(f"{Fore.GREEN}Goodbye!{Style.RESET_ALL}")
        return True

    def _help_command(self, _):
        print(f"{Fore.GREEN}Available commands:{Style.RESET_ALL}")
        for cmd, cmd_info in sorted(self.commands.items()):
            description = cmd_info.get("description", "No description available")
            print(f"{Fore.GREEN}  /{cmd} - {description}{Style.RESET_ALL}")
        return False

    def clear_screen(self):
        os.system("cls" if os.name == "nt" else "clear")

    def print_separator(self):
        print(f"{Fore.GREEN}{'─' * os.get_terminal_size().columns}{Style.RESET_ALL}")

    def print_message(self, role, text):
        print(f"{Fore.GREEN}{role.capitalize()}: {Style.RESET_ALL}{text}")
        print("-" * 50)

    def start(self):
        self.print_separator()
        self.banner_function()

        while True:
            self.print_separator()
            try:
                user_input = prompt(
                    HTML('<style fg="green">></style> '),
                    completer=self.completer,
                    key_bindings=self.kb,
                )

                if not user_input.strip():
                    continue

                if user_input.startswith("/"):
                    command = user_input[1:].lower()
                    if command in self.commands:
                        should_quit = self.commands[command]["function"](user_input)
                        if should_quit:
                            break
                    else:
                        print(
                            f"{Fore.RED}Unknown command: {user_input}{Style.RESET_ALL}"
                        )
                        print(
                            f"{Fore.GREEN}Type '/help' for available commands{Style.RESET_ALL}"
                        )
                else:
                    response = self.send_function(user_input)
                    self.recv_function(response)
            except KeyboardInterrupt:
                os._exit(0)
            except EOFError:
                os._exit(0)

#----------------------------------exec -------------------------------------
def ask_yes_no(question):
    while True:
        response = input(question + " (yes/no): ").strip().lower()
        if response in ['yes', 'y']:
            return True
        elif response in ['no', 'n']:
            return False
        else:
            print("Please enter 'yes' or 'no'.")

class BashPromptExecutor:
    def __init__(self):
        """Initialize with the prompt template"""
        self.prompt_template = """
你现在是一个 Bash 代码生成器,你的任务是根据用户的需求生成 Bash 代码。请严格遵守以下格式规范:
1. 代码输出格式
 - 代码段必须包裹在 三重反引号(```) 内。
 - 代码段的上方必须包含 执行标识(@@@execute:),格式如下:
   @@@execute: [yes/no]
   ```
   ```
 - 代码段和执行标识之间不要有空行。
2. 执行标识规则:
 - 当用户表达 明确的执行意图 (如“执行”、“立马执行”、“不要再问我要不要执行”)时,@@@execute: 的值必须设为yes 。
 - 否则,默认设置@@@execute为no 。

3. 每次只能生成一段代码,不得同时生成多段代码。
"""

    def get_prompt(self):
        return self.prompt_template

    def parse_execute_code_block(self, text):
        import re
        #pattern = r'@@@execute:\s*(yes|no)?\s*```(.*?)```'
        pattern = r'@@@execute:\s*(yes|no)?\s*```.*?\n(.*?)```'
        match = re.search(pattern, text, re.DOTALL)
        if match:
            execute_value = match.group(1).strip() if match.group(1) else "no"
            code_content = match.group(2).strip()
            return {
                "execute_tag_found": True,
                "execute_value": execute_value,
                "code_content": code_content
            }
        else:
            return {
                "execute_tag_found": False,
                "execute_value": "no",
                "code_content": None
            }

    def process_response(self, response_text):
        parse_result = self.parse_execute_code_block(response_text)

        if not parse_result["execute_tag_found"]:
            return {
                "status": "no_executable_code_found",
                "message": "No executable code block with @@@execute: tag was found in the response."
            }

        bash_script = parse_result["code_content"]

        # Check if execute_value is "yes" to determine whether to execute immediately
        if parse_result["execute_value"].lower() == "yes":
            execution_result = self.execute_bash_code(bash_script)
            return {
                "status": "execution_completed",
                "code_executed": bash_script,
                "execution_results": execution_result
            }
        else:
            print(bash_script)
            if ask_yes_no("Confirm execution of the bash script?"):
                execution_result = self.execute_bash_code(bash_script)
                return {
                    "status": "execution_completed",
                    "code_executed": bash_script,
                    "execution_results": execution_result
                }
            else:
                return {
                    "status": "execution_pending",
                    "code_to_execute": bash_script,
                    "message": "User confirmation required before execution"
                }



    def execute_bash_code(self, bash_script):
        import subprocess

        result = subprocess.run(
            ["bash", "-c", bash_script],
            text=True,
            capture_output=True
        )

        return {
            "stdout": result.stdout,
            "stderr": result.stderr,
            "return_code": result.returncode
        }


if __name__ == "__main__":
    api_key = os.getenv("GEMINI_API_KEY")
    executor = BashPromptExecutor()
    initial_message = executor.get_prompt()
    def receive_from_gemini(text):
        result = executor.process_response(text)
        import json
        print("\nExecution Results:")
        print(json.dumps(result, indent=2))
        print(f"{Fore.GREEN}Assistant: {Style.RESET_ALL}{text}")

    if api_key is None:
        print(
            f"{Fore.RED}Please set the GEMINI_API_KEY environment variable.{Style.RESET_ALL}"
        )
    else:
        gemini_chat = GeminiChat(api_key, initial_message)

        def send_to_gemini(message):
            return gemini_chat.send(message)


        def clear_command(_):
            print(f"{Fore.GREEN}Clearing screen...{Style.RESET_ALL}")
            os.system("cls" if os.name == "nt" else "clear")
            return False

        def quit_command(_):
            print(f"{Fore.GREEN}Custom quit!{Style.RESET_ALL}")
            return True

        def history_command(_):
            terminal_chat.clear_screen()
            print(f"{Fore.GREEN}Chat History:{Style.RESET_ALL}")
            for message in gemini_chat.history:
                terminal_chat.print_message(message["role"], message["text"])
            return False

        def gemini_banner_function():
            print(f"{Fore.GREEN}Welcome to Gemini Terminal Chat!{Style.RESET_ALL}")
            print(
                f"{Fore.GREEN}Type '/quit' to quit, '/history' to see chat history, '/help' for more commands{Style.RESET_ALL}"
            )
            print(
                f"{Fore.GREEN}Press Ctrl+C or Ctrl+D to quit directly{Style.RESET_ALL}"
            )

        custom_commands = {
            "clear": {
                "function": clear_command,
                "description": "Clear the terminal screen",
            },
            "quit": {"function": quit_command, "description": "Exit the application"},
            "history": {
                "function": history_command,
                "description": "Display the full conversation history",
            },
        }

        terminal_chat = TerminalChat(
            send_function=send_to_gemini,
            recv_function=receive_from_gemini,
            banner_function=gemini_banner_function,
            custom_commands=custom_commands,
        )

        terminal_chat.start()